package com.shujia.ads

import com.shujia.util.FlinkTool

object AdsSkuIndex extends FlinkTool{
  /**
    * 需要在子类实现抽象方法
    *
    */
  override def run(args: Array[String]): Unit = {
    table.executeSql(
      """
        |insert into gma_ads.ads_sku_index
        |select
        |a.sku_id ,
        |b.weight,
        |b.tm_name,
        |b.price,
        |b.spu_id,
        |b.category3_name,
        |b.category2_name,
        |b.category1_name,
        |a.order_count,
        |a.order_amount,
        |a.sku_count,
        |a.dt
        |from
        |(select
        |sku_id,
        |DATE_FORMAT(pay_time,'yyyy-MM-dd') as dt,
        |count(order_id) as order_count,
        |sum(sku_num*order_price) as order_amount,
        |sum(sku_num) as sku_count
        |from
        |gma_dwd.dwd_paid_order_detail /*+ OPTIONS('scan.startup.mode'='earliest-offset') */
        |where pay_time is not null
        |group by sku_id,DATE_FORMAT(pay_time,'yyyy-MM-dd'))
        |as a
        |join
        |gma_dim.dim_kafka_mysql_sku_cdc  as b
        |on a.sku_id = b.id
        |
        |
      """.stripMargin)
  }
}
